{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Running Dask on the cluster with MLRun\n",
    "\n",
    "The Dask framework enables you to parallelize your Python code and run it as a distributed process on an Iguazio cluster and dramatically accelerate the performance.\n",
    "In this notebook you'll learn how to create a Dask cluster and then an MLRun function running as a Dask client.\n",
    "It also demonstrates how to run parallelize custom algorithm using Dask Delayed option.\n",
    "\n",
    "For more information on Dask over Kubernetes: https://kubernetes.dask.org/en/latest/.\n",
    "\n",
    "```{admonition} Note\n",
    "Dask is currently in Tech Preview status.\n",
    "```\n",
    "\n",
    "**In this section**\n",
    "- [Set up the environment](#set-up-the-environment)\n",
    "- [Create and start the Dask cluster](#create-and-start-the-dask-cluster)\n",
    "- [Creating a function that runs over Dask](#creating-a-function-that-runs-over-dask)\n",
    "- [Test the function over Dask](#test-the-function-over-dask)\n",
    "- [Track the progress in the UI](#track-the-progress-in-the-ui)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Set up the environment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2024-10-02 08:12:17,022 [info] Created and saved project: {\"context\":\"./\",\"from_template\":null,\"name\":\"dask-demo\",\"overwrite\":false,\"save\":true}\n",
      "> 2024-10-02 08:12:17,026 [info] Project created successfully: {\"project_name\":\"dask-demo\",\"stored_in_db\":true}\n"
     ]
    }
   ],
   "source": [
    "# set mlrun api path and artifact path for logging\n",
    "import mlrun\n",
    "\n",
    "project = mlrun.get_or_create_project(\"dask-demo\", \"./\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create and start the Dask cluster\n",
    "Dask functions can be local (local workers), or remote (use containers in the cluster). In the case of remote you \n",
    "can specify the number of replicas (optional) or leave blank for auto-scale.<br>\n",
    "Use the `set()` to define the Dask cluster and set the desired configuration of that clustered function.\n",
    "\n",
    "If the Dask workers need to access the shared file system, apply a shared volume mount (e.g. via v3io mount).\n",
    "\n",
    "The Dask function spec has several unique attributes (in addition to the standard job attributes):\n",
    "\n",
    "* **.remote** &mdash; bool, use local or clustered dask\n",
    "* **.replicas** &mdash; number of desired replicas, keep 0 for auto-scale\n",
    "* **.min_replicas**, **.max_replicas** &mdash; set replicas range for auto-scale\n",
    "* **.scheduler_timeout** &mdash; cluster is killed after timeout (inactivity), default is '60 minutes', should be at least 5 minutes to avoid transient issues\n",
    "* **.nthreads** &mdash; number of worker threads\n",
    "<br>\n",
    "\n",
    "If you want to access the Dask dashboard or scheduler from remote you need to use NodePort service type (set `.service_type` to 'NodePort'), and the external IP need to be specified in the MLRun configuration (mlconf.remote_host). This is set automatically if you are running on an Iguazio cluster.\n",
    "\n",
    "Specify the kind (dask) and the container image:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<mlrun.runtimes.daskjob.DaskCluster at 0x7f965e52b610>"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# create an mlrun function that will init the dask cluster\n",
    "dask_cluster_name = \"dask-cluster\"\n",
    "dask_cluster = project.set_function(\n",
    "    name=dask_cluster_name, kind=\"dask\", image=\"mlrun/mlrun\"\n",
    ")\n",
    "dask_cluster.apply(mlrun.mount_v3io())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# set range for # of replicas with replicas and max_replicas\n",
    "dask_cluster.spec.min_replicas = 1\n",
    "dask_cluster.spec.max_replicas = 4\n",
    "\n",
    "# set the use of dask remote cluster (distributed)\n",
    "dask_cluster.spec.remote = True\n",
    "dask_cluster.spec.service_type = \"NodePort\"\n",
    "\n",
    "# set dask memory and cpu limits\n",
    "dask_cluster.with_worker_requests(mem=\"2G\", cpu=\"2\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Initialize the Dask Cluster\n",
    "\n",
    "When you request the dask cluster `client` attribute, it verifies that the cluster is up and running:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "import warnings\n",
    "\n",
    "# Ignore mismatched versions warning logged by dask\n",
    "warnings.simplefilter(\"ignore\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2024-10-02 08:13:56,759 [info] Trying dask client at: tcp://mlrun-dask-cluster-05bbaed5-3.default-tenant:8786\n",
      "> 2024-10-02 08:13:56,833 [info] Using remote dask scheduler (mlrun-dask-cluster-05bbaed5-3) at: tcp://mlrun-dask-cluster-05bbaed5-3.default-tenant:8786\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a href=\"http://default-tenant.app.vmdev17.lab.iguazeng.com:31898/status\" target=\"_blank\" >dashboard link: default-tenant.app.vmdev17.lab.iguazeng.com:31898</a>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "    <div style=\"width: 24px; height: 24px; background-color: #e1e1e1; border: 3px solid #9D9D9D; border-radius: 5px; position: absolute;\"> </div>\n",
       "    <div style=\"margin-left: 48px;\">\n",
       "        <h3 style=\"margin-bottom: 0px;\">Client</h3>\n",
       "        <p style=\"color: #9D9D9D; margin-bottom: 0px;\">Client-4548a53d-8096-11ef-a9bc-6e177956313e</p>\n",
       "        <table style=\"width: 100%; text-align: left;\">\n",
       "\n",
       "        <tr>\n",
       "        \n",
       "            <td style=\"text-align: left;\"><strong>Connection method:</strong> Direct</td>\n",
       "            <td style=\"text-align: left;\"></td>\n",
       "        \n",
       "        </tr>\n",
       "\n",
       "        \n",
       "            <tr>\n",
       "                <td style=\"text-align: left;\">\n",
       "                    <strong>Dashboard: </strong> <a href=\"http://mlrun-dask-cluster-05bbaed5-3.default-tenant:8787/status\" target=\"_blank\">http://mlrun-dask-cluster-05bbaed5-3.default-tenant:8787/status</a>\n",
       "                </td>\n",
       "                <td style=\"text-align: left;\"></td>\n",
       "            </tr>\n",
       "        \n",
       "\n",
       "        </table>\n",
       "\n",
       "        \n",
       "\n",
       "        \n",
       "            <details>\n",
       "            <summary style=\"margin-bottom: 20px;\"><h3 style=\"display: inline;\">Scheduler Info</h3></summary>\n",
       "            <div style=\"\">\n",
       "    <div>\n",
       "        <div style=\"width: 24px; height: 24px; background-color: #FFF7E5; border: 3px solid #FF6132; border-radius: 5px; position: absolute;\"> </div>\n",
       "        <div style=\"margin-left: 48px;\">\n",
       "            <h3 style=\"margin-bottom: 0px;\">Scheduler</h3>\n",
       "            <p style=\"color: #9D9D9D; margin-bottom: 0px;\">Scheduler-0a2c3809-fca1-4535-bf7f-1457230f1857</p>\n",
       "            <table style=\"width: 100%; text-align: left;\">\n",
       "                <tr>\n",
       "                    <td style=\"text-align: left;\">\n",
       "                        <strong>Comm:</strong> tcp://10.233.92.190:8786\n",
       "                    </td>\n",
       "                    <td style=\"text-align: left;\">\n",
       "                        <strong>Workers:</strong> 0\n",
       "                    </td>\n",
       "                </tr>\n",
       "                <tr>\n",
       "                    <td style=\"text-align: left;\">\n",
       "                        <strong>Dashboard:</strong> <a href=\"http://10.233.92.190:8787/status\" target=\"_blank\">http://10.233.92.190:8787/status</a>\n",
       "                    </td>\n",
       "                    <td style=\"text-align: left;\">\n",
       "                        <strong>Total threads:</strong> 0\n",
       "                    </td>\n",
       "                </tr>\n",
       "                <tr>\n",
       "                    <td style=\"text-align: left;\">\n",
       "                        <strong>Started:</strong> 1 minute ago\n",
       "                    </td>\n",
       "                    <td style=\"text-align: left;\">\n",
       "                        <strong>Total memory:</strong> 0 B\n",
       "                    </td>\n",
       "                </tr>\n",
       "            </table>\n",
       "        </div>\n",
       "    </div>\n",
       "\n",
       "    <details style=\"margin-left: 48px;\">\n",
       "        <summary style=\"margin-bottom: 20px;\">\n",
       "            <h3 style=\"display: inline;\">Workers</h3>\n",
       "        </summary>\n",
       "\n",
       "        \n",
       "\n",
       "    </details>\n",
       "</div>\n",
       "            </details>\n",
       "        \n",
       "\n",
       "    </div>\n",
       "</div>"
      ],
      "text/plain": [
       "<Client: 'tcp://10.233.92.190:8786' processes=0 threads=0, memory=0 B>"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# init dask client and use the scheduler address as param in the following cell\n",
    "dask_cluster.client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating a function that runs over Dask"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "# mlrun: start-code"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Import mlrun and dask. Nuclio is only used to convert the code into an MLRun function."
   ]
  },
  {
   "cell_type": "raw",
   "metadata": {
    "vscode": {
     "languageId": "raw"
    }
   },
   "source": [
    "%nuclio config kind = \"job\"\n",
    "%nuclio config spec.image = \"mlrun/mlrun\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client\n",
    "from dask import dataframe as dd\n",
    "\n",
    "import mlrun"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Python function code"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This simple function reads a .csv file using dask dataframe. It runs the `groupby` and `describe` functions on the dataset, and stores the results as a dataset artifact.<br>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "def test_dask(\n",
    "    context, dataset: mlrun.DataItem, client=None, dask_function: str = None\n",
    ") -> None:\n",
    "    # setup dask client from the MLRun dask cluster function\n",
    "    if dask_function:\n",
    "        client = mlrun.import_function(dask_function).client\n",
    "    elif not client:\n",
    "        client = Client()\n",
    "\n",
    "    # load the dataitem as dask dataframe (dd)\n",
    "    df = dataset.as_df(df_module=dd)\n",
    "\n",
    "    # run describe (get statistics for the dataframe) with dask\n",
    "    df_describe = df.describe().compute()\n",
    "\n",
    "    # run groupby and count using dask\n",
    "    df_grpby = df.groupby(\"VendorID\").count().compute()\n",
    "\n",
    "    context.log_dataset(\"describe\", df=df_grpby, format=\"csv\", index=True)\n",
    "    return"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "# mlrun: end-code"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Test the function over Dask"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Load sample data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "DATA_URL = \"/User/examples/ytrip.csv\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current\n",
      "                                 Dload  Upload   Total   Spent    Left  Speed\n",
      " 54 84.9M   54 46.4M    0     0  1653k      0  0:00:52  0:00:28  0:00:24 1680k5M    0     0  1655k      0  0:00:52  0:00:20  0:00:32 1529k"
     ]
    }
   ],
   "source": [
    "!mkdir -p /User/examples/\n",
    "!curl -L \"https://s3.wasabisys.com/iguazio/data/Taxi/yellow_tripdata_2019-01_subset.csv\" > {DATA_URL}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Convert the code to MLRun function"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use `set_function` to convert the code to MLRun function and specify the configuration for the Dask process (e.g. replicas, memory etc.). <br>\n",
    "Note that the resource configurations are per worker."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# mlrun transforms the code above (up to nuclio: end-code cell) into serverless function\n",
    "# which runs in k8s pods\n",
    "fn = project.set_function(name=\"test-dask\", kind=\"job\", handler=\"test_dask\").apply(\n",
    "    mlrun.mount_v3io()\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Run the function"
   ]
  },
  {
   "cell_type": "raw",
   "metadata": {},
   "source": [
    "When running the function there is a link as part of the result. Click the link to go to the Dask monitoring dashboard."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# function URI is db://<project>/<name>\n",
    "dask_uri = f\"db://{project.name}/{dask_cluster_name}\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "r = fn.run(\n",
    "    handler=test_dask,\n",
    "    inputs={\"dataset\": DATA_URL},\n",
    "    params={\"dask_function\": dask_uri},\n",
    "    auto_build=True,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Track the progress in the UI"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can view the progress and detailed information in the MLRun UI by clicking on the uid above. <br>\n",
    "To track the dask progress: in the Dask UI click the \"dashboard link\" above the \"client\" section."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.18"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
